package com.exini.dicom.streams

import akka.stream._
import akka.stream.stage._
import akka.util.ByteString

class Chunker(val chunkSize: Int) extends GraphStage[FlowShape[ByteString, ByteString]] {
  val in: Inlet[ByteString]                             = Inlet[ByteString]("Chunker.in")
  val out: Outlet[ByteString]                           = Outlet[ByteString]("Chunker.out")
  override val shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)

  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      private var buffer = ByteString.empty

      setHandler(
        out,
        new OutHandler {
          override def onPull(): Unit =
            if (isClosed(in)) emitChunk()
            else pull(in)
        }
      )
      setHandler(
        in,
        new InHandler {
          override def onPush(): Unit = {
            val elem = grab(in)
            buffer ++= elem
            emitChunk()
          }

          override def onUpstreamFinish(): Unit =
            if (buffer.isEmpty) completeStage()
            else
            // There are elements left in buffer, so
            // we keep accepting downstream pulls and push from buffer until emptied.
            //
            // It might be though, that the upstream finished while it was pulled, in which
            // case we will not get an onPull from the downstream, because we already had one.
            // In that case we need to emit from the buffer.
            if (isAvailable(out)) emitChunk()
        }
      )

      private def emitChunk(): Unit =
        if (buffer.isEmpty)
          if (isClosed(in)) completeStage()
          else pull(in)
        else {
          val (chunk, nextBuffer) = buffer.splitAt(chunkSize)
          buffer = nextBuffer
          push(out, chunk)
        }

    }
}
